package com.xj.kafka.consumer;

import kafka.consumer.ConsumerConfig;

import java.util.HashMap;
import java.util.Map;

/**
 * User: bjxiajun
 * Date: 13-10-18
 * Time: 下午1:31
 * 处理商品被点击后的消息
 */
public class ItemBrowsingConsumer extends BaseConsumer {
    private String topics;
    /**
     * 单品页消息接收配置
     *
     * @param consumerConfig
     * @param topics 接收的主题
     * @param threadNum 每个主题stream使用的线程数
     */
    public ItemBrowsingConsumer(ConsumerConfig consumerConfig, String topics,int threadNum) {
        super(consumerConfig,threadNum);
        this.topics = topics;
    }

    public void run() {
        this.start(getTopicMap(),new ItemMessageDispose());
    }

    private final Map<String, Integer> getTopicMap() {
        Map<String, Integer> topicMap = null;
        if (topics != null) {
            topicMap=new HashMap<String, Integer>();
            String[] topics_ = topics.split(",");
            for (int i = 0; i < topics_.length; i++) {
                String topicStream = topics_[i];
                String[] topic_stream = topicStream.split(":");
                topicMap.put(topic_stream[0], Integer.parseInt(topic_stream[1]));
            }
        }
        return topicMap;
    }
}
